package com.atguigu.utils;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.common.Constant;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

public class HBaseUtil {

    //获取HBase连接方法
    public static Connection getConnection() throws IOException {
        Configuration configuration = HBaseConfiguration.create();
        return ConnectionFactory.createConnection(configuration);
    }

    //获取HBase异步连接方法
    public static AsyncConnection getAsyncConnection() throws IOException, ExecutionException, InterruptedException {
        Configuration configuration = HBaseConfiguration.create();
        return ConnectionFactory.createAsyncConnection(configuration).get();
    }

    //创建HBase表
    public static void createTable(Connection connection, String nameSpace, String tableName, byte[][] splitKey, String... columnFamilies) throws IOException {

        //校验列族信息是否完整
        if (columnFamilies.length <= 0) {
            throw new RuntimeException("请设置列族信息！");
        }

        //判断表是否存在
        Admin admin = connection.getAdmin();
        TableName tName = TableName.valueOf(nameSpace + ":" + tableName);
        if (admin.tableExists(tName)) {
            System.out.println("待创建的表:" + tableName + "已经存在！");
        } else {

            ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
            //遍历列族信息,创建列族描述器,放入集合
            for (String columnFamily : columnFamilies) {
                ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(columnFamily.getBytes())
                        .build();
                columnFamilyDescriptors.add(familyDescriptor);
            }

            TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tName)
                    .setColumnFamilies(columnFamilyDescriptors)
                    .build();

            if (splitKey == null) {
                admin.createTable(tableDescriptor);
            } else {
                admin.createTable(tableDescriptor, splitKey);
            }
        }
        admin.close();
    }

    //删除HBase表
    public static void dropTable(Connection connection, String nameSpace, String tableName) throws IOException {

        //获取Admin对象
        Admin admin = connection.getAdmin();

        TableName tName = TableName.valueOf(nameSpace + ":" + tableName);
        if (admin.tableExists(tName)) {
            admin.disableTable(tName);
            admin.deleteTable(tName);
        }

        admin.close();
    }

    //插入数据方法
    public static void putData(Connection connection, String nameSpace, String tableName, String rowKey, String columnFamily, JSONObject data) throws IOException {

        //获取表对象
        Table table = connection.getTable(TableName.valueOf(nameSpace + ":" + tableName));

        //创建Put对象
        Put put = new Put(rowKey.getBytes());

        //添加列信息
        Set<Map.Entry<String, Object>> entries = data.entrySet();
        for (Map.Entry<String, Object> next : entries) {
            //String value = (String) next.getValue();
            Object nextValue = next.getValue();
            if (nextValue == null) {
                continue;
            }
            String value = nextValue.toString();
            put.addColumn(columnFamily.getBytes(), next.getKey().getBytes(), value.getBytes());
        }

        //执行插入数据操作
        table.put(put);

        //释放资源
        table.close();
    }

    /**
     * @param rowKey   "1001"
     * @param splitKey "01|,02|"
     * @return 01_1001  02_1001  02|1001
     */
    public static String getRowKey(String rowKey, String splitKey) {

        String[] keys = splitKey.split(",");

        //创建分区号集合
        ArrayList<String> list = new ArrayList<>();
        for (String key : keys) {
            list.add(key.replace("|", "_"));
        }
        list.add(keys[keys.length - 1]);

        return list.get(rowKey.hashCode() % list.size()) + rowKey;
    }

    //删除数据方法
    public static void deleteData(Connection connection, String nameSpace, String tableName, String rowKey) throws IOException {

        //获取表对象
        Table table = connection.getTable(TableName.valueOf(nameSpace + ":" + tableName));

        //创建Delete对象
        Delete delete = new Delete(rowKey.getBytes());

        //执行删除操作
        table.delete(delete);

        //释放资源
        table.close();
    }

    public static String getBaseDicDDL() {
        return "create table base_dic(\n" +
                "    dic_code string,\n" +
                "    info ROW<dic_name string>\n" +
                ") with (\n" +
                "    'connector' = 'hbase-2.2',\n" +
                "    'table-name' = 'gmall_230315:dim_base_dic',\n" +
                "    'zookeeper.quorum' = 'hadoop102:2181,hadoop103:2181,hadoop104:2181'\n" +
                ")";
    }

    public static JSONObject getData(Connection connection, String tableName, String rowKey) throws IOException {
        //获取表对象
        Table table = connection.getTable(TableName.valueOf(Constant.HBASE_NAME_SPACE + ":" + tableName));

        //创建Get对象
        Get get = new Get(rowKey.getBytes());

        //执行查询操作
        Result result = table.get(get);

        //解析result
        JSONObject jsonObject = new JSONObject();
        for (Cell cell : result.rawCells()) {
            jsonObject.put(new String(CellUtil.cloneQualifier(cell)), new String(CellUtil.cloneValue(cell)));
        }

        //返回结果
        table.close();
        return jsonObject;
    }


    public static JSONObject getData(AsyncConnection connection, String tableName, String rowKey) throws IOException, ExecutionException, InterruptedException {
        //获取表对象
        AsyncTable<AdvancedScanResultConsumer> asyncTable = connection.getTable(TableName.valueOf(Constant.HBASE_NAME_SPACE + ":" + tableName));
        //创建Get对象
        Get get = new Get(rowKey.getBytes());
        //执行查询操作
        Result result = asyncTable.get(get).get();
        //解析result
        JSONObject jsonObject = new JSONObject();
        for (Cell cell : result.rawCells()) {
            jsonObject.put(new String(CellUtil.cloneQualifier(cell)), new String(CellUtil.cloneValue(cell)));
        }
        //返回结果
        return jsonObject;
    }

    public static void main(String[] args) throws IOException {
        Connection connection = getConnection();
//        System.out.println(connection);

//        System.out.println(getRowKey("1003", "01|,02|"));

        long start = System.currentTimeMillis();
        System.out.println(getData(connection, "dim_sku_info", "1")); //1153  1080
        long end = System.currentTimeMillis();
        System.out.println(getData(connection, "dim_sku_info", "1")); //3  2  2
        long end2 = System.currentTimeMillis();

        System.out.println(end - start);
        System.out.println(end2 - end);

        connection.close();
    }


}
